Interconnecting with the UBShmTransport Based on the LD/ST Shared Memory Semantics.#3290
Open
zchuango wants to merge 21 commits intoapache:masterfrom
Open
Interconnecting with the UBShmTransport Based on the LD/ST Shared Memory Semantics.#3290zchuango wants to merge 21 commits intoapache:masterfrom
zchuango wants to merge 21 commits intoapache:masterfrom
Conversation
wwbmmm
reviewed
May 10, 2026
| #include "brpc/transport.h" | ||
|
|
||
| namespace brpc { | ||
| class UBShmTransport : public Transport { |
There was a problem hiding this comment.
Pull request overview
Adds a new UBRing-based shared-memory transport mode to brpc (IPC + optional ubs-mem backend) and wires it into the Socket/Transport framework, along with docs and a performance example.
Changes:
- Introduce UBRing transport (
SOCKET_MODE_UBRING) with endpoint handshake, polling, and ring manager infrastructure. - Add shared-memory backend abstraction (POSIX IPC + ubs-mem via dlopen’d SDK stubs/headers) plus timer utilities.
- Update build/docs/examples to expose the feature and provide a basic performance harness.
Reviewed changes
Copilot reviewed 43 out of 43 changed files in this pull request and generated 15 comments.
Show a summary per file
| File | Description |
|---|---|
| src/brpc/ubshm/ubs_mem/ubshmem_stub.cpp | Adds stub implementations of ubs-mem APIs for non-ubs environments/UT. |
| src/brpc/ubshm/ubs_mem/ubs_mem.h | Introduces ubs-mem C API header used by the UBS backend integration. |
| src/brpc/ubshm/ubs_mem/ubs_mem_def.h | Defines ubs-mem types/constants used by the UBS backend integration. |
| src/brpc/ubshm/ubs_mem/declare_shm_ubs.h | Declares the dynamically loaded ubs-mem function pointer table. |
| src/brpc/ubshm/ubr_trx.h | Defines core UBR transaction structures and states. |
| src/brpc/ubshm/ubr_msg.h | Defines UBR message chunk format used by the ring transport. |
| src/brpc/ubshm/ub_ring.h | Declares UBRing read/write and lifecycle APIs used by the endpoint. |
| src/brpc/ubshm/ub_ring_manager.h | Declares global manager for UBR transactions and link bookkeeping. |
| src/brpc/ubshm/ub_ring_manager.cpp | Implements UBR transaction manager and UB event callback plumbing. |
| src/brpc/ubshm/ub_helper.h | Declares UBRing global init/availability helpers. |
| src/brpc/ubshm/ub_helper.cpp | Implements global init/fini, availability flags, and polling init. |
| src/brpc/ubshm/ub_endpoint.h | Declares UB shared-memory endpoint and polling infrastructure. |
| src/brpc/ubshm/ub_endpoint.cpp | Implements handshake, polling loop, and I/O integration with Socket/InputMessenger. |
| src/brpc/ubshm/timer/timer_mgr.h | Declares timer module used by UBS cleanup/recovery flows. |
| src/brpc/ubshm/timer/timer_mgr.cpp | Implements epoll/kqueue-based timer dispatch for UBRing subsystems. |
| src/brpc/ubshm/shm/shm_ubs.h | Declares UBS backend shared-memory operations. |
| src/brpc/ubshm/shm/shm_ubs.cpp | Implements UBS backend via dynamically loaded ubs-mem SDK. |
| src/brpc/ubshm/shm/shm_mgr.h | Declares backend-agnostic SHM manager interface. |
| src/brpc/ubshm/shm/shm_mgr.cpp | Implements SHM manager selecting IPC vs UBS backend via flag. |
| src/brpc/ubshm/shm/shm_ipc.h | Declares POSIX IPC SHM backend operations. |
| src/brpc/ubshm/shm/shm_ipc.cpp | Implements POSIX IPC SHM backend operations. |
| src/brpc/ubshm/shm/shm_def.h | Adds SHM structs/constants used across SHM backends and UBRing. |
| src/brpc/ubshm/common/thread_lock.h | Adds RAII-style mutex/spin/rwlock/semaphore guard macros. |
| src/brpc/ubshm/common/common.h | Adds common macros/types/constants used throughout UBRing code. |
| src/brpc/ubshm_transport.h | Declares UBShmTransport implementing the Transport interface. |
| src/brpc/ubshm_transport.cpp | Implements transport selection between UBRing and TCP fallback paths. |
| src/brpc/transport_factory.cpp | Wires SOCKET_MODE_UBRING into transport creation/context init. |
| src/brpc/socket.h | Adds UB endpoint/connect friend declarations for Socket integration. |
| src/brpc/socket_mode.h | Adds SOCKET_MODE_UBRING enum value. |
| src/brpc/rdma_transport.cpp | Adjusts RDMA transport’s TCP fallback member initialization (currently broken). |
| src/brpc/input_messenger.h | Adds UB endpoint friend declaration to support message processing hooks. |
| src/brpc/input_messenger.cpp | Extends RDMA-special message queuing behavior to UBRing sockets. |
| src/brpc/controller.h | Guards latency_us() against unset begin time. |
| README.md | Adds docs link for UBRing. |
| README_cn.md | Adds docs link for UBRing (CN). |
| example/ubring_performance/test.proto | Adds proto for UBRing performance test example. |
| example/ubring_performance/server.cpp | Adds UBRing-capable perf test server example. |
| example/ubring_performance/client.cpp | Adds UBRing-capable perf test client example. |
| example/ubring_performance/CMakeLists.txt | Adds standalone CMake build for the performance example. |
| docs/en/ubring.md | Documents build/run/configuration and backend selection for UBRing. |
| docs/cn/ubring.md | Chinese documentation for UBRing build/run/configuration. |
| CMakeLists.txt | Adds WITH_UBRING option and compile definition wiring. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| _on_edge_trigger = rdma::RdmaEndpoint::OnNewDataFromTcp; | ||
| } | ||
| _tcp_transport = std::make_shared<TcpTransport>(); | ||
| _tcp_transport = std::unique_ptr<TcpTransport>(); |
Comment on lines
+49
to
+54
| if (options.need_on_edge_trigger && _on_edge_trigger == NULL) { | ||
| _on_edge_trigger = ubring::UBShmEndpoint::OnNewDataFromTcp; | ||
| } | ||
| _tcp_transport = std::unique_ptr<TcpTransport>(new TcpTransport()); | ||
| _tcp_transport->Init(socket, options); | ||
| } |
Comment on lines
+98
to
+114
| memcpy(shm_name, current_pos, SHM_MAX_NAME_BUFF_LEN); | ||
| } | ||
|
|
||
| std::string HelloMessage::toString() const { | ||
| constexpr size_t MAX_LEN = 16 + 6 + 16 + 6 + 16 + 6 + 20 + 6 + SHM_MAX_NAME_BUFF_LEN + 32; | ||
| std::array<char, MAX_LEN> buf; | ||
| int n = snprintf(buf.data(), buf.size(), | ||
| "msg_len=%u, hello_ver=%u, impl_ver=%u, len=%lu, shm_name=%.*s", | ||
| msg_len, | ||
| hello_ver, | ||
| impl_ver, | ||
| static_cast<unsigned long>(len), // 兼容32/64位 | ||
| static_cast<int>(SHM_MAX_NAME_BUFF_LEN), // 限制最大输出长度 | ||
| shm_name | ||
| ); | ||
| return std::string(buf.data(), static_cast<size_t>(n)); | ||
| } |
Comment on lines
+81
to
+97
| uint16_t* current_pos = (uint16_t*)data; | ||
| *(current_pos++) = butil::HostToNet16(msg_len); | ||
| *(current_pos++) = butil::HostToNet16(hello_ver); | ||
| *(current_pos++) = butil::HostToNet16(impl_ver); | ||
| uint64_t* len_pos = (uint64_t*)current_pos; | ||
| *len_pos = butil::HostToNet64(len); | ||
| current_pos += 4; | ||
| memcpy(current_pos, shm_name, SHM_MAX_NAME_BUFF_LEN); | ||
| } | ||
|
|
||
| void HelloMessage::Deserialize(void* data) { | ||
| uint16_t* current_pos = (uint16_t*)data; | ||
| msg_len = butil::NetToHost16(*current_pos++); | ||
| hello_ver = butil::NetToHost16(*current_pos++); | ||
| impl_ver = butil::NetToHost16(*current_pos++); | ||
| len = butil::NetToHost64(*(uint64_t*)current_pos); | ||
| current_pos += 4; // move forward 4 Bytes |
| auto* ub_transport = static_cast<UBShmTransport*>(s->_transport.get()); | ||
| size_t local_shm_len = (size_t)(FLAGS_data_queue_size) * MB_TO_BYTE; | ||
| SHM local_trx_shm = {NULL, local_shm_len, 0, {0}, (uint32_t)s->fd()}; | ||
| const char* shm_name = butil::endpoint2str(s->local_side()).c_str(); |
Comment on lines
+210
to
+218
| uint32_t ubrIndex = trx->trxMgrIndex; | ||
| char* connectName = trx->localShm.name; | ||
| if (g_linkInfoMgr.linkMgrUnitStatus[ubrIndex] == UBR_MGR_UNIT_FREE) { | ||
| strncpy(g_linkInfoMgr.allLinkInfo[ubrIndex].connectName, | ||
| connectName, SHM_MAX_NAME_BUFF_LEN); | ||
| strncpy(g_linkInfoMgr.allLinkInfo[ubrIndex].listenerName, | ||
| listenerName, SHM_MAX_NAME_BUFF_LEN); | ||
| g_linkInfoMgr.linkMgrUnitStatus[ubrIndex] = UBR_MGR_UNIT_USED; | ||
| g_linkInfoMgr.linkNum++; |
Comment on lines
+515
to
+519
| LOCK_GUARD(shmList->shmLock); | ||
| if (UNLIKELY(shmList == NULL)) { | ||
| LOG(ERROR) << "Shm list is null."; | ||
| return UBRING_ERR; | ||
| } |
Comment on lines
+30
to
+67
| DEFINE_int32(ub_shm_type, 1, "shm type: 1-ipc; 2-ub_ring"); | ||
| static SHM_TYPE g_shmType; | ||
|
|
||
| static bool CheckInputShmParam(SHM *shm) { | ||
| if (shm == NULL) { | ||
| LOG(ERROR) << "Input Param shm is NULL."; | ||
| return false; | ||
| } | ||
|
|
||
| size_t nameLen = strlen(shm->name); | ||
| if (nameLen <= 0 || nameLen > SHM_MAX_NAME_LEN) { | ||
| LOG(ERROR) << "Shm name=" << shm->name << ", length=" << shm->len | ||
| << ", which is not between 1 and " << SHM_MAX_NAME_LEN; | ||
| return false; | ||
| } | ||
|
|
||
| if (shm->len <= 0) { | ||
| LOG(ERROR) << "Shm length=" << shm->len << " is invalid."; | ||
| return false; | ||
| } | ||
|
|
||
| if (shm->len < SHM_ALLOC_UNIT_SIZE || (shm->len & (SHM_ALLOC_UNIT_SIZE - 1)) != 0) { | ||
| LOG(ERROR) << "Shm length=" << shm->len << " need to be (1..n) * 4MB."; | ||
| return false; | ||
| } | ||
|
|
||
| return true; | ||
| } | ||
|
|
||
| RETURN_CODE ShmMgrInit(void) { | ||
| if (UNLIKELY(FLAGS_ub_shm_type >= (uint32_t)SHM_TYPE_UNSUPPORT)) { | ||
| LOG(ERROR) << "Shm type config=" << FLAGS_ub_shm_type << " is not supported."; | ||
| return UBRING_ERR; | ||
| } | ||
|
|
||
| g_shmType = (SHM_TYPE)FLAGS_ub_shm_type; | ||
| if (g_shmType == SHM_TYPE_UBS) { | ||
| if (UbsShmInit() != UBRING_OK) { |
| ${MCPACK2PB_SOURCES} | ||
| ${BRPC_SOURCES} | ||
| ${THRIFT_SOURCES} | ||
| ${BRPC_C_SOURCES} |
Comment on lines
+31
to
+54
| void UBShmTransport::Init(Socket *socket, const SocketOptions &options) { | ||
| CHECK(_ub_ep == NULL); | ||
| if (options.socket_mode == SOCKET_MODE_UBRING) { | ||
| _ub_ep = new(std::nothrow)ubring::UBShmEndpoint(socket); | ||
| if (!_ub_ep) { | ||
| const int saved_errno = errno; | ||
| PLOG(ERROR) << "Fail to create UBShmEndpoint"; | ||
| socket->SetFailed( | ||
| saved_errno, "Fail to create UBShmEndpoint: %s", berror(saved_errno)); | ||
| } | ||
| _ub_state = UB_UNKNOWN; | ||
| } else { | ||
| _ub_state = UB_OFF; | ||
| socket->_socket_mode = SOCKET_MODE_TCP; | ||
| } | ||
| _socket = socket; | ||
| _default_connect = options.app_connect; | ||
| _on_edge_trigger = options.on_edge_triggered_events; | ||
| if (options.need_on_edge_trigger && _on_edge_trigger == NULL) { | ||
| _on_edge_trigger = ubring::UBShmEndpoint::OnNewDataFromTcp; | ||
| } | ||
| _tcp_transport = std::unique_ptr<TcpTransport>(new TcpTransport()); | ||
| _tcp_transport->Init(socket, options); | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What problem does this PR solve?
Issue Number: #3226 #3167 #3217
Problem Summary:
After recent efforts, the UB-Ring framework has been successfully integrated with the BRPC transport framework. Currently, high-performance and low-latency communication based on the load/store (LD/ST) semantics is supported. I feel happy be able to contribute this to the community and look forward to receiving feedback and reviews. @wwbmmm @chenBright
What is changed and the side effects?
Changed:
Side effects:
Performance effects: NAN
Breaking backward compatibility:
Check List: